diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -21,6 +21,9 @@
[mypy-htmlmin.*]
ignore_missing_imports = True
+[mypy-keycloak.*]
+ignore_missing_imports = True
+
[mypy-magic.*]
ignore_missing_imports = True
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -18,6 +18,7 @@
python-dateutil
pyyaml
requests
+python-keycloak
python-memcached
pybadges
sentry-sdk
@@ -26,5 +27,3 @@
# Doc dependencies
sphinx
sphinxcontrib-httpdomain
-
-
diff --git a/swh/web/admin/urls.py b/swh/web/admin/urls.py
--- a/swh/web/admin/urls.py
+++ b/swh/web/admin/urls.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
from django.conf.urls import url
-from django.contrib.auth.views import LoginView, LogoutView
+from django.contrib.auth.views import LoginView
from django.shortcuts import redirect
from swh.web.admin.adminurls import AdminUrls
@@ -17,12 +17,11 @@
return redirect('admin-origin-save')
-urlpatterns = [url(r'^$', _admin_default_view, name='admin'),
- url(r'^login/$',
- LoginView.as_view(template_name='login.html'),
- name='login'),
- url(r'^logout/$',
- LogoutView.as_view(template_name='logout.html'),
- name='logout')]
+urlpatterns = [
+ url(r'^$', _admin_default_view, name='admin'),
+ url(r'^login/$',
+ LoginView.as_view(template_name='login.html'),
+ name='login'),
+]
urlpatterns += AdminUrls.get_url_patterns()
diff --git a/swh/web/assets/src/bundles/webapp/webapp.css b/swh/web/assets/src/bundles/webapp/webapp.css
--- a/swh/web/assets/src/bundles/webapp/webapp.css
+++ b/swh/web/assets/src/bundles/webapp/webapp.css
@@ -295,6 +295,11 @@
color: #fecd1b;
}
+.swh-position-left {
+ position: absolute;
+ left: 0;
+}
+
.swh-position-right {
position: absolute;
right: 0;
diff --git a/swh/web/auth/__init__.py b/swh/web/auth/__init__.py
new file mode 100644
diff --git a/swh/web/auth/backends.py b/swh/web/auth/backends.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/backends.py
@@ -0,0 +1,105 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timedelta
+from typing import Dict, Optional
+
+from django.http import HttpRequest
+
+from swh.web.auth.utils import get_oidc_client
+from swh.web.auth.models import OIDCUser
+
+_auth_exception: Dict[str, Exception] = {}
+
+
+def _oidc_user_from_info(userinfo):
+ # create a Django user that will not be saved to database
+ user = OIDCUser(id=userinfo['user_id'],
+ username=userinfo['preferred_username'],
+ password='',
+ first_name=userinfo['given_name'],
+ last_name=userinfo['family_name'],
+ email=userinfo['email'])
+
+ # add OIDC userinfo to custom User proxy model
+ user.userinfo = userinfo
+
+ return user
+
+
+def _create_oidc_user(oidc_client, access_token):
+ # get OIDC userinfo
+ userinfo = oidc_client.userinfo(access_token)
+
+ # compute an integer user identifier for Django User model
+ # by concatenating all groups of the UUID4 user identifier
+ # generated by Keycloak and converting it from hex to decimal
+ user_id = int(''.join(userinfo['sub'].split('-')), 16)
+ userinfo['user_id'] = user_id
+
+ return _oidc_user_from_info(userinfo)
+
+
+class OIDCAuthorizationCodePKCEBackend:
+
+ _users: Dict[int, OIDCUser] = {}
+
+ def authenticate(self, request: HttpRequest, code: str, code_verifier: str,
+ redirect_uri: str) -> Optional[OIDCUser]:
+
+ user = None
+ try:
+ # get OpenID Connect client to communicate with Keycloak server
+ oidc_client = get_oidc_client()
+
+ # try to authenticate user with OIDC PKCE authorization code flow
+ oidc_profile = oidc_client.authorization_code(
+ code, redirect_uri, code_verifier=code_verifier)
+ access_token = oidc_profile['access_token']
+
+ # create Django user
+ user = _create_oidc_user(
+ oidc_client, oidc_profile['access_token'])
+
+ # decode JWT token
+ decoded_token = oidc_client.decode_token(access_token)
+
+ # get authentication init datetime
+ auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
+
+ # compute OIDC tokens expiration date
+ oidc_profile['expire_datetime'] = (
+ auth_datetime +
+ timedelta(seconds=oidc_profile['expires_in']))
+ oidc_profile['refresh_expire_datetime'] = (
+ auth_datetime +
+ timedelta(seconds=oidc_profile['refresh_expires_in']))
+
+ # add OIDC profile to custom User proxy model
+ user.oidc_profile = oidc_profile
+
+ # register authenticated user
+ self._users[user.id] = user
+ except Exception as e:
+ global _auth_exception
+ _auth_exception[code_verifier] = e
+
+ return user
+
+ def get_user(self, user_id: int) -> Optional[OIDCUser]:
+ if user_id in self._users:
+ return self._users[user_id]
+ else:
+ return None
+
+
+def get_auth_exception(code_verifier: str) -> Exception:
+ """
+ Returns the caught exception when user authentication failed.
+
+ Args:
+ code_verifier: Code verifier used for authentication
+ """
+ return _auth_exception[code_verifier]
diff --git a/swh/web/auth/keycloak.py b/swh/web/auth/keycloak.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/keycloak.py
@@ -0,0 +1,160 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+
+from typing import Any, Dict, Optional
+from urllib.parse import urlencode
+
+from keycloak import KeycloakOpenID
+
+
+class KeycloakOpenIDConnect:
+ """
+ Wrapper class around python-keycloak to ease the interaction with Keycloak
+ for managing authentication and user permissions with OpenID Connect.
+ """
+
+ def __init__(self, server_url: str, realm_name: str, client_id: str,
+ realm_public_key: str = ''):
+ """
+ Args:
+ server_url: URL of the Keycloak server
+ realm_name: The realm name
+ client_id: The OpenID Connect client identifier
+ realm_public_key: The realm public key (will be dynamically
+ retrieved if not provided)
+ """
+ self._keycloak = KeycloakOpenID(
+ server_url=server_url,
+ client_id=client_id,
+ realm_name=realm_name,
+ )
+
+ self.server_url = server_url
+ self.realm_name = realm_name
+ self.client_id = client_id
+ self.realm_public_key = realm_public_key
+
+ def well_known(self) -> Dict[str, Any]:
+ """
+ Retrieve the OpenID Connect Well-Known URI registry from Keycloak.
+
+ Returns:
+ A dictionary filled with OpenID Connect URIS.
+ """
+ return self._keycloak.well_know()
+
+ def authorization_url(self, redirect_uri: str,
+ **extra_params: str) -> str:
+ """
+ Get OpenID Connect authorization URL to authenticate users.
+
+ Args:
+ redirect_uri: URI to redirect to once a user is authenticated
+ extra_params: Extra query parameters to add to the
+ authorization URL
+ """
+ auth_url = self._keycloak.auth_url(redirect_uri)
+ if extra_params:
+ auth_url += '&%s' % urlencode(extra_params)
+ return auth_url
+
+ def authorization_code(self, code: str, redirect_uri: str,
+ **extra_params: str) -> Dict[str, Any]:
+ """
+ Get OpenID Connect authentication tokens using Authorization
+ Code flow.
+
+ Args:
+ code: Authorization code provided by Keycloak
+ redirect_uri: URI to redirect to once a user is authenticated
+ (must be the same as the one provided to authorization_url)
+ extra_params: Extra parameters to add in the authorization request
+ payload.
+ """
+ return self._keycloak.token(
+ grant_type='authorization_code',
+ code=code,
+ redirect_uri=redirect_uri,
+ **extra_params)
+
+ def refresh_token(self, refresh_token: str) -> Dict[str, Any]:
+ """
+ Request a new access token from Keycloak using a refresh token.
+
+ Args:
+ refresh_token: A refresh token provided by Keycloak
+
+ Returns:
+ A dictionary filled with tokens info
+ """
+ return self._keycloak.refresh_token(refresh_token)
+
+ def decode_token(self, token: str,
+ options: Optional[Dict[str, Any]] = None
+ ) -> Dict[str, Any]:
+ """
+ Try to decode a JWT token.
+
+ Args:
+ token: A JWT token to decode
+ options: Options for jose.jwt.decode
+
+ Returns:
+ A dictionary filled with decoded token content
+ """
+ if not self.realm_public_key:
+ certs = self._keycloak.certs()
+ self.realm_public_key = json.dumps(certs['keys'][0])
+
+ return self._keycloak.decode_token(token, key=self.realm_public_key,
+ options=options)
+
+ def logout(self, refresh_token: str) -> None:
+ """
+ Logout a user by closing its authenticated session.
+
+ Args:
+ refresh_token: A refresh token provided by Keycloak
+ """
+ self._keycloak.logout(refresh_token)
+
+ def userinfo(self, access_token: str) -> Dict[str, Any]:
+ """
+ Return user information from its access token.
+
+ Args:
+ access_token: An access token provided by Keycloak
+
+ Returns:
+ A dictionary fillled with user information
+ """
+ return self._keycloak.userinfo(access_token)
+
+
+_keycloak_oidc: Dict[str, KeycloakOpenIDConnect] = {}
+
+
+def get_keycloak_oidc_client(server_url: str, realm_name: str,
+ client_id: str) -> KeycloakOpenIDConnect:
+ """
+ Instantiate a KeycloakOpenIDConnect class for a given client in a
+ given realm.
+
+ Args:
+ server_url: Base URL of a Keycloak server
+ realm_name: Name of the realm in Keycloak
+ client_id: Client identifier in the realm
+
+ Returns:
+ An object to ease the interaction with the Keycloak server
+ """
+ realm_client_id = f'{realm_name}_{client_id}'
+ if realm_client_id not in _keycloak_oidc:
+ _keycloak_oidc[realm_client_id] = KeycloakOpenIDConnect(server_url,
+ realm_name,
+ client_id)
+ return _keycloak_oidc[realm_client_id]
diff --git a/swh/web/auth/models.py b/swh/web/auth/models.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/models.py
@@ -0,0 +1,56 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Any, Dict
+
+from django.contrib.auth.models import User
+
+
+class OIDCUser(User):
+ """
+ Custom User proxy model for remote users storing OpenID Connect
+ related data: profile containing authorization tokens and userinfo.
+
+ The model is also not saved to database as all users are already stored
+ in the Keycloak one.
+ """
+
+ _oidc_profile: Dict[str, Any] = {}
+ _userinfo: Dict[str, Any] = {}
+
+ class Meta:
+ app_label = 'swh.web.auth'
+ proxy = True
+
+ def get_oidc_profile(self) -> Dict[str, Any]:
+ return self._oidc_profile
+
+ def set_oidc_profile(self, oidc_profile: Dict[str, Any]):
+ self._oidc_profile = oidc_profile
+
+ def get_userinfo(self) -> Dict[str, Any]:
+ return self._userinfo
+
+ def set_userinfo(self, userinfo: Dict[str, Any]):
+ self._userinfo = userinfo
+
+ oidc_profile = property(get_oidc_profile, set_oidc_profile)
+ userinfo = property(get_userinfo, set_userinfo)
+
+ @property
+ def pk(self) -> int:
+ """
+ Enable to check if a user is a remote one in Django templates
+ without raising a django.template.base.VariableDoesNotExist
+ exception.
+ """
+ return 0
+
+ def save(self, **kwargs):
+ """
+ Override django.db.models.Model.save to avoid saving the remote
+ users to web application database.
+ """
+ pass
diff --git a/swh/web/auth/utils.py b/swh/web/auth/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/utils.py
@@ -0,0 +1,65 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import hashlib
+import os
+import re
+
+from base64 import urlsafe_b64encode
+from typing import Tuple
+
+from django.conf import settings
+
+from swh.web.auth.keycloak import (
+ KeycloakOpenIDConnect, get_keycloak_oidc_client
+)
+from swh.web.config import get_config
+
+
+def gen_oidc_pkce_codes() -> Tuple[str, str]:
+ """
+ Generates a code verifier and a code challenge to be used
+ with the OpenID Connect authorization code flow with PKCE
+ ("Proof Key for Code Exchange", see https://tools.ietf.org/html/rfc7636).
+
+ PKCE replaces the static secret used in the standard authorization
+ code flow with a temporary one-time challenge, making it feasible
+ to use in public clients.
+
+ The implementation is taken from that blog post:
+ https://www.stefaanlippens.net/oauth-code-flow-pkce.html
+ """
+ # generate a code verifier which is a long enough random alphanumeric
+ # string, only to be used "client side"
+ code_verifier = urlsafe_b64encode(os.urandom(60))
+ code_verifier_str = code_verifier.decode('utf-8')
+ code_verifier_str = re.sub(r'[^a-zA-Z0-9-\._~]+', '', code_verifier_str)
+
+ # create the PKCE code challenge by hashing the code verifier with SHA256
+ # and encoding the result in URL-safe base64 (without padding)
+ code_challenge = hashlib.sha256(code_verifier_str.encode('utf-8')).digest()
+ code_challenge_str = urlsafe_b64encode(code_challenge).decode('utf-8')
+ code_challenge_str = code_challenge_str.replace('=', '')
+
+ return code_verifier_str, code_challenge_str
+
+
+def get_oidc_client(client_id: str = '') -> KeycloakOpenIDConnect:
+ """
+ Instantiate a KeycloakOpenIDConnect class for a given client in the
+ SoftwareHeritage realm.
+
+ Args:
+ client_id: client identifier in the SoftwareHeritage realm
+
+ Returns:
+ An object to ease the interaction with the Keycloak server
+ """
+ if not client_id:
+ client_id = settings.OIDC_SWH_WEB_CLIENT_ID
+ swhweb_config = get_config()
+ return get_keycloak_oidc_client(swhweb_config['keycloak']['server_url'],
+ swhweb_config['keycloak']['realm_name'],
+ client_id)
diff --git a/swh/web/auth/views.py b/swh/web/auth/views.py
new file mode 100644
--- /dev/null
+++ b/swh/web/auth/views.py
@@ -0,0 +1,115 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import uuid
+
+from typing import cast
+
+from django.conf.urls import url
+from django.contrib.auth import authenticate, login, logout
+from django.http import HttpRequest
+from django.http.response import HttpResponse, HttpResponseRedirect
+
+from swh.web.auth.backends import get_auth_exception
+from swh.web.auth.models import OIDCUser
+from swh.web.auth.utils import gen_oidc_pkce_codes, get_oidc_client
+from swh.web.common.exc import handle_view_exception, BadInputExc
+from swh.web.common.utils import reverse
+
+
+def oidc_login(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to initiate login process using OpenID Connect.
+ """
+ state = str(uuid.uuid4())
+ redirect_uri = reverse('oidc-login-complete', request=request)
+
+ code_verifier, code_challenge = gen_oidc_pkce_codes()
+
+ request.session['nonce'] = {
+ 'code_verifier': code_verifier,
+ 'state': state,
+ 'redirect_uri': redirect_uri,
+ 'next_path': request.GET.get('next'),
+ }
+
+ authorization_url_params = {
+ 'state': state,
+ 'code_challenge': code_challenge,
+ 'code_challenge_method': 'S256',
+ 'scope': 'openid',
+ }
+
+ try:
+ oidc_client = get_oidc_client()
+ authorization_url = oidc_client.authorization_url(
+ redirect_uri, **authorization_url_params)
+
+ return HttpResponseRedirect(authorization_url)
+ except Exception as e:
+ return handle_view_exception(request, e)
+
+
+def oidc_login_complete(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to finalize login process using OpenID Connect.
+ """
+ try:
+ if 'nonce' not in request.session:
+ raise Exception('Login process has not been initialized.')
+
+ if 'code' not in request.GET and 'state' not in request.GET:
+ raise BadInputExc('Missing query parameters for authentication.')
+
+ state = request.GET['state']
+
+ nonce = request.session['nonce']
+
+ if state != nonce['state']:
+ raise BadInputExc('CSRF attack detected, aborting login process.')
+
+ user = authenticate(request=request,
+ code=request.GET['code'],
+ code_verifier=nonce['code_verifier'],
+ redirect_uri=nonce['redirect_uri'])
+
+ if user is None:
+ raise get_auth_exception(nonce['code_verifier'])
+
+ login(request, user)
+
+ del request.session['nonce']
+
+ redirect_url = nonce['next_path'] or request.build_absolute_uri('/')
+
+ return HttpResponseRedirect(redirect_url)
+ except Exception as e:
+ return handle_view_exception(request, e)
+
+
+def oidc_logout(request: HttpRequest) -> HttpResponse:
+ """
+ Django view to logout using OpenID Connect.
+ """
+ try:
+ user = request.user
+ logout(request)
+ if hasattr(user, 'oidc_profile'):
+ oidc_client = get_oidc_client()
+ user = cast(OIDCUser, user)
+ oidc_client.logout(user.oidc_profile['refresh_token'])
+
+ logout_url = reverse('logout', query_params={'remote_user': 1})
+ return HttpResponseRedirect(request.build_absolute_uri(logout_url))
+ except Exception as e:
+ return handle_view_exception(request, e)
+
+
+urlpatterns = [
+ url(r'^oidc/login/$', oidc_login, name='oidc-login'),
+ url(r'^oidc/login-complete/$', oidc_login_complete,
+ name='oidc-login-complete'),
+ url(r'^oidc/logout/$', oidc_logout, name='oidc-logout'),
+]
diff --git a/swh/web/config.py b/swh/web/config.py
--- a/swh/web/config.py
+++ b/swh/web/config.py
@@ -110,6 +110,10 @@
'es_workers_index_url': ('string', ''),
'history_counters_url': ('string', 'https://stats.export.softwareheritage.org/history_counters.json'), # noqa
'client_config': ('dict', {}),
+ 'keycloak': ('dict', {
+ 'server_url': 'http://localhost:8080/auth/',
+ 'realm_name': 'SoftwareHeritage'
+ }),
}
swhweb_config = {} # type: Dict[str, Any]
diff --git a/swh/web/settings/common.py b/swh/web/settings/common.py
--- a/swh/web/settings/common.py
+++ b/swh/web/settings/common.py
@@ -45,7 +45,7 @@
'swh.web.browse',
'webpack_loader',
'django_js_reverse',
- 'corsheaders'
+ 'corsheaders',
]
MIDDLEWARE = [
@@ -57,7 +57,7 @@
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
- 'swh.web.common.middlewares.ThrottlingHeadersMiddleware'
+ 'swh.web.common.middlewares.ThrottlingHeadersMiddleware',
]
# Compress all assets (static ones and dynamically generated html)
@@ -289,3 +289,10 @@
CORS_ORIGIN_ALLOW_ALL = True
CORS_URLS_REGEX = r'^/badge/.*$'
+
+AUTHENTICATION_BACKENDS = [
+ 'django.contrib.auth.backends.ModelBackend',
+ 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend',
+]
+
+OIDC_SWH_WEB_CLIENT_ID = 'swh-web'
diff --git a/swh/web/templates/layout.html b/swh/web/templates/layout.html
--- a/swh/web/templates/layout.html
+++ b/swh/web/templates/layout.html
@@ -71,6 +71,9 @@
+ -
+ Donate
+
-
Home
@@ -81,11 +84,20 @@
Documentation
-
- {% if user.is_authenticated and user.is_staff %}
+ {% url 'logout' as logout_url %}
+ {% if user.is_authenticated %}
Logged in as {{ user.username }},
- logout
+ {% if user.pk == 0 %}
+ logout
+ {% else %}
+ logout
+ {% endif %}
{% else %}
- Donate
+ {% if request.path != logout_url %}
+ login
+ {% else %}
+ login
+ {% endif %}
{% endif %}
@@ -160,7 +172,7 @@
Help
- {% if user.is_authenticated %}
+ {% if user.is_authenticated and user.is_staff %}
diff --git a/swh/web/templates/logout.html b/swh/web/templates/logout.html
--- a/swh/web/templates/logout.html
+++ b/swh/web/templates/logout.html
@@ -17,5 +17,11 @@
{% block content %}
You have been successfully logged out.
-Log in again.
+
+{% if 'remote_user' in request.GET %}
+
+{% else %}
+
+{% endif %}
+Log in again.
{% endblock %}
diff --git a/swh/web/tests/auth/__init__.py b/swh/web/tests/auth/__init__.py
new file mode 100644
diff --git a/swh/web/tests/auth/keycloak_mock.py b/swh/web/tests/auth/keycloak_mock.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/keycloak_mock.py
@@ -0,0 +1,79 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from copy import copy
+from unittest.mock import Mock
+
+from django.conf import settings
+from django.utils import timezone
+
+from swh.web.auth.keycloak import KeycloakOpenIDConnect
+from swh.web.config import get_config
+
+from .sample_data import oidc_profile, realm_public_key, userinfo
+
+
+class KeycloackOpenIDConnectMock(KeycloakOpenIDConnect):
+
+ def __init__(self, auth_success=True):
+ swhweb_config = get_config()
+ super().__init__(swhweb_config['keycloak']['server_url'],
+ swhweb_config['keycloak']['realm_name'],
+ settings.OIDC_SWH_WEB_CLIENT_ID,
+ realm_public_key)
+ self._keycloak.well_know = lambda: {
+ 'issuer': f'{self.server_url}realms/{self.realm_name}',
+ 'authorization_endpoint': (f'{self.server_url}realms/'
+ f'{self.realm_name}/protocol/'
+ 'openid-connect/auth'),
+ 'token_endpoint': (f'{self.server_url}realms/{self.realm_name}/'
+ 'protocol/openid-connect/token'),
+ 'token_introspection_endpoint': (f'{self.server_url}realms/'
+ f'{self.realm_name}/protocol/'
+ 'openid-connect/token/'
+ 'introspect'),
+ 'userinfo_endpoint': (f'{self.server_url}realms/{self.realm_name}/'
+ 'protocol/openid-connect/userinfo'),
+ 'end_session_endpoint': (f'{self.server_url}realms/'
+ f'{self.realm_name}/protocol/'
+ 'openid-connect/logout'),
+ 'jwks_uri': (f'{self.server_url}realms/{self.realm_name}/'
+ 'protocol/openid-connect/certs'),
+ }
+ self.authorization_code = Mock()
+ self.userinfo = Mock()
+ self.logout = Mock()
+ if auth_success:
+ self.authorization_code.return_value = copy(oidc_profile)
+ self.userinfo.return_value = copy(userinfo)
+ else:
+ self.authorization_url = Mock()
+ exception = Exception('Authentication failed')
+ self.authorization_code.side_effect = exception
+ self.authorization_url.side_effect = exception
+ self.userinfo.side_effect = exception
+ self.logout.side_effect = exception
+
+ def decode_token(self, token):
+ # skip signature expiration check as we use a static oidc_profile
+ # for the tests with expired tokens in it
+ options = {'verify_exp': False}
+ decoded = super().decode_token(token, options)
+ # tweak auth and exp time for tests
+ expire_in = decoded['exp'] - decoded['auth_time']
+ decoded['auth_time'] = int(timezone.now().timestamp())
+ decoded['exp'] = decoded['auth_time'] + expire_in
+ return decoded
+
+
+def mock_keycloak(mocker, auth_success=True):
+ kc_oidc_mock = KeycloackOpenIDConnectMock(auth_success)
+ mock_get_oidc_client = mocker.patch(
+ 'swh.web.auth.views.get_oidc_client')
+ mock_get_oidc_client.return_value = kc_oidc_mock
+ mock_get_oidc_client_backend = mocker.patch(
+ 'swh.web.auth.backends.get_oidc_client')
+ mock_get_oidc_client_backend.return_value = kc_oidc_mock
+ return kc_oidc_mock
diff --git a/swh/web/tests/auth/sample_data.py b/swh/web/tests/auth/sample_data.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/sample_data.py
@@ -0,0 +1,117 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+realm_public_key = {
+ 'alg': 'RS256',
+ 'e': 'AQAB',
+ 'kid': 'OJxUCJtNbPOCNPaM6g7eMscjjMxhzoo4lYhXlkW6MhA',
+ 'kty': 'RSA',
+ 'n': ('nqF4xvGjaI54P6WtJvyGayxP8A93uNcA3TH6jitwmyAalj8dN8_NzK9vrdlSA3Ibvp_'
+ 'XQujPSOP7a35YiYFscEJnogTXQpE_FhZrUYy21U6ezruVUv4z_ER1cYLb-q5ZI86nXS'
+ 'TNCAbH-lw7rQjlvcJ9KvgHEeA5ALXJ1r55zUmNvuy5o6ke1G3fXbNSXwF4qlWAzo1o7'
+ 'Ms8qNrNyOG8FPx24dvm9xMH7_08IPvh9KUqlnP8h6olpxHrdrX_q4E-Nzj8Tr8p7Z5C'
+ 'imInls40QuOTIhs6C2SwFHUgQgXl9hB9umiZJlwYEpDv0_LO2zYieHl5Lv7Iig4FOIX'
+ 'IVCaDGQ'),
+ 'use': 'sig',
+ 'x5c': ['MIICrzCCAZcCBgFvyCM+VzANBgkqhkiG9w0BAQsFADAbMRkwFwYDVQQDDBBTb2Z0d'
+ '2FyZUhlcml0YWdlMB4XDTIwMDEyMTEyNDQxMFoXDTMwMDEyMTEyNDU1MFowGzEZMB'
+ 'cGA1UEAwwQU29mdHdhcmVIZXJpdGFnZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADC'
+ 'CAQoCggEBAJ6heMbxo2iOeD+lrSb8hmssT/APd7jXAN0x+o4rcJsgGpY/HTfPzcyv'
+ 'b63ZUgNyG76f10Loz0jj+2t+WImBbHBCZ6IE10KRPxYWa1GMttVOns67lVL+M/xEd'
+ 'XGC2/quWSPOp10kzQgGx/pcO60I5b3CfSr4BxHgOQC1yda+ec1Jjb7suaOpHtRt31'
+ '2zUl8BeKpVgM6NaOzLPKjazcjhvBT8duHb5vcTB+/9PCD74fSlKpZz/IeqJacR63a'
+ '1/6uBPjc4/E6/Ke2eQopiJ5bONELjkyIbOgtksBR1IEIF5fYQfbpomSZcGBKQ79Py'
+ 'zts2Inh5eS7+yIoOBTiFyFQmgxkCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEAbxzPN'
+ 'LcfFimBQsDI+1VRj2xWOdfnJoFOkwfAxfHdjRzX6QZgYbyNs7p0b51DE8XnAzVArq'
+ 'RUFvLXBoRdy5KnMFADxuRWDPUfYBZD66OzasQtQ1aJAi/CoG6el6a+UbUicR+/6Mq'
+ '3u/9QNFT+uxsQ+/botz44HAJS57Eq++PXiIx9T4FJPiUIeNvCxzcnCVzTkFtZrVKR'
+ 'Ytjiu9xcWfeDhc0BK5WzTBO+GZ6XvetOHN/TGdCTPxv8IhLv3vhRtJNCkqABGlOQ9'
+ 'tB8SX80/RkQYViEl/UdluyiR9pkZ2KhcZO0Txn1p46CwX5fGpzVe9OUPu3TYgLLPU'
+ 'OzGMNpdQqtGQ=='],
+ 'x5t': '2lFnNGJe05ZC6IE9dw3AEqUL4zE',
+ 'x5t#S256': 'rmpCyJ_jt-gQp5k8PawzouvrTmXJ-TuaBPqH3sgX27o'
+}
+
+oidc_profile = {
+ 'access_token': ('eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhV'
+ 'Q0p0TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.'
+ 'eyJqdGkiOiIzMWZjNTBiNy1iYmU1LTRmNTEtOTFlZi04ZTNlZWM1MTMz'
+ 'MWUiLCJleHAiOjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIz'
+ 'MTAxLCJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFs'
+ 'bXMvU29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6WyJzd2gtd2ViIiwiYWNj'
+ 'b3VudCJdLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRhNjUtYTIzNi0xNGY2'
+ 'MWU2YjcyMDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzd2gtd2ViIiwi'
+ 'YXV0aF90aW1lIjoxNTgyNzIzMTAwLCJzZXNzaW9uX3N0YXRlIjoiZDgy'
+ 'YjkwZDEtMGE5NC00ZTc0LWFkNjYtZGQ5NTM0MWM3YjZkIiwiYWNyIjoi'
+ 'MSIsImFsbG93ZWQtb3JpZ2lucyI6WyIqIl0sInJlYWxtX2FjY2VzcyI6'
+ 'eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0'
+ 'aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsiYWNjb3VudCI6eyJyb2xl'
+ 'cyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtz'
+ 'Iiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvcGVuaWQgZW1haWwg'
+ 'cHJvZmlsZSIsImVtYWlsX3ZlcmlmaWVkIjpmYWxzZSwibmFtZSI6Ikpv'
+ 'aG4gRG9lIiwiZ3JvdXBzIjpbXSwicHJlZmVycmVkX3VzZXJuYW1lIjoi'
+ 'am9obmRvZSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUi'
+ 'OiJEb2UiLCJlbWFpbCI6ImpvaG4uZG9lQGV4YW1wbGUuY29tIn0.neJ-'
+ 'Pmd87J6Gt0fzDqmXFeoy34Iqb5vNNEEgIKqtqg3moaVkbXrO_9R37DJB'
+ 'AgdFv0owVONK3GbqPOEICePgG6RFtri999DetNE-O5sB4fwmHPWcHPlO'
+ 'kcPLbVJqu6zWo-2AzlfAy5bCNvj_wzs2tjFjLeHcRgR1a1WY3uTp5EWc'
+ 'HITCWQZzZWFGZTZCTlGkpdyJTqxGBdSHRB4NlIVGpYSTBsBsxttFEetl'
+ 'rpcNd4-5AteFprIr9hn9VasIIF8WdFdtC2e8xGMJW5Q0M3G3Iu-LLNmE'
+ 'oTIDqtbJ7OrIcGBIwsc3seCV3eCG6kOYwz5w-f8DeOpwcDX58yYPmapJ'
+ '6A'),
+ 'expires_in': 600,
+ 'id_token': ('eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPSnhVQ0p0'
+ 'TmJQT0NOUGFNNmc3ZU1zY2pqTXhoem9vNGxZaFhsa1c2TWhBIn0.eyJqdGki'
+ 'OiI0NDRlYzU1My1iYzhiLTQ2YjYtOTlmYS0zOTc3YTJhZDY1ZmEiLCJleHAi'
+ 'OjE1ODI3MjM3MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxLCJpc3MiOiJo'
+ 'dHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMvU29mdHdhcmVIZXJp'
+ 'dGFnZSIsImF1ZCI6InN3aC13ZWIiLCJzdWIiOiJmZWFjZDM0NC1iNDY4LTRh'
+ 'NjUtYTIzNi0xNGY2MWU2YjcyMDAiLCJ0eXAiOiJJRCIsImF6cCI6InN3aC13'
+ 'ZWIiLCJhdXRoX3RpbWUiOjE1ODI3MjMxMDAsInNlc3Npb25fc3RhdGUiOiJk'
+ 'ODJiOTBkMS0wYTk0LTRlNzQtYWQ2Ni1kZDk1MzQxYzdiNmQiLCJhY3IiOiIx'
+ 'IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJuYW1lIjoiSm9obiBEb2UiLCJn'
+ 'cm91cHMiOltdLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huZG9lIiwiZ2l2'
+ 'ZW5fbmFtZSI6IkpvaG4iLCJmYW1pbHlfbmFtZSI6IkRvZSIsImVtYWlsIjoi'
+ 'am9obi5kb2VAZXhhbXBsZS5jb20ifQ.YB7bxlz_wgLJSkylVjmqedxQgEMee'
+ 'JOdi9CFHXV4F3ZWsEZ52CGuJXsozkX2oXvgU06MzzLNEK8ojgrPSNzjRkutL'
+ 'aaLq_YUzv4iV8fmKUS_aEyiYZbfoBe3Y4dwv2FoPEPCt96iTwpzM5fg_oYw_'
+ 'PHCq-Yl5SulT1nTrJZpntkf0hRjmxlDO06JMp0aZ8xS8RYJqH48xCRf_DARE'
+ '0jJV2-UuzOWI6xBATwFfP44kV6wFmErLN5txMgwZzCSB2OCe5Cl1il0eTQTN'
+ 'ybeSYZeZE61QtuTRUHeP1D1qSbJGy5g_S67SdTkS-hQFvfrrD84qGflIEqnX'
+ 'ZbYnitD1Typ6Q'),
+ 'not-before-policy': 0,
+ 'refresh_expires_in': 1800,
+ 'refresh_token': ('eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJmNjM'
+ 'zMDE5MS01YTU4LTQxMDAtOGIzYS00ZDdlM2U1NjA3MTgifQ.eyJqdGk'
+ 'iOiIxYWI5ZWZmMS0xZWZlLTQ3MDMtOGQ2YS03Nzg1NWUwYzQyYTYiLC'
+ 'JleHAiOjE1ODI3MjQ5MDEsIm5iZiI6MCwiaWF0IjoxNTgyNzIzMTAxL'
+ 'CJpc3MiOiJodHRwOi8vbG9jYWxob3N0OjgwODAvYXV0aC9yZWFsbXMv'
+ 'U29mdHdhcmVIZXJpdGFnZSIsImF1ZCI6Imh0dHA6Ly9sb2NhbGhvc3Q'
+ '6ODA4MC9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwic3ViIj'
+ 'oiZmVhY2QzNDQtYjQ2OC00YTY1LWEyMzYtMTRmNjFlNmI3MjAwIiwid'
+ 'HlwIjoiUmVmcmVzaCIsImF6cCI6InN3aC13ZWIiLCJhdXRoX3RpbWUi'
+ 'OjAsInNlc3Npb25fc3RhdGUiOiJkODJiOTBkMS0wYTk0LTRlNzQtYWQ'
+ '2Ni1kZDk1MzQxYzdiNmQiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOl'
+ 'sib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwic'
+ 'mVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFu'
+ 'YWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXc'
+ 'tcHJvZmlsZSJdfX0sInNjb3BlIjoib3BlbmlkIGVtYWlsIHByb2ZpbG'
+ 'UifQ.xQYrl2CMP_GQ_TFqhsTz-rTs3WuZz5I37toi1eSsDMI'),
+ 'scope': 'openid email profile',
+ 'session_state': 'd82b90d1-0a94-4e74-ad66-dd95341c7b6d',
+ 'token_type': 'bearer'
+}
+
+userinfo = {
+ 'email': 'john.doe@example.com',
+ 'email_verified': False,
+ 'family_name': 'Doe',
+ 'given_name': 'John',
+ 'groups': [],
+ 'name': 'John Doe',
+ 'preferred_username': 'johndoe',
+ 'sub': 'feacd344-b468-4a65-a236-14f61e6b7200'
+}
diff --git a/swh/web/tests/auth/test_backends.py b/swh/web/tests/auth/test_backends.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_backends.py
@@ -0,0 +1,79 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from copy import copy
+from datetime import datetime, timedelta
+
+from django.contrib.auth import authenticate, get_backends
+
+import pytest
+
+from django.conf import settings
+
+from swh.web.auth.models import OIDCUser
+from swh.web.common.utils import reverse
+
+from .sample_data import oidc_profile, userinfo
+from .keycloak_mock import mock_keycloak
+
+
+def _authenticate_user(request_factory):
+ request = request_factory.get(reverse('oidc-login-complete'))
+
+ return authenticate(request=request,
+ code='some-code',
+ code_verifier='some-code-verifier',
+ redirect_uri='https://localhost:5004')
+
+
+def _check_authenticated_user(user):
+ assert user is not None
+ assert isinstance(user, OIDCUser)
+ assert user.pk == 0
+ assert user.id != 0
+ assert user.username == userinfo['preferred_username']
+ assert user.password == ''
+ assert user.first_name == userinfo['given_name']
+ assert user.last_name == userinfo['family_name']
+ assert user.email == userinfo['email']
+
+ assert hasattr(user, 'userinfo')
+ userinfo_cp = copy(userinfo)
+ userinfo_cp['user_id'] = user.id
+ assert user.userinfo == userinfo_cp
+
+
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_success(mocker, request_factory):
+ kc_oidc_mock = mock_keycloak(mocker)
+ user = _authenticate_user(request_factory)
+
+ _check_authenticated_user(user)
+
+ assert hasattr(user, 'oidc_profile')
+ decoded_token = kc_oidc_mock.decode_token(oidc_profile['access_token'])
+ auth_datetime = datetime.fromtimestamp(decoded_token['auth_time'])
+ oidc_profile_cp = copy(oidc_profile)
+ oidc_profile_cp['expire_datetime'] = (
+ auth_datetime +
+ timedelta(seconds=oidc_profile['expires_in']))
+ oidc_profile_cp['refresh_expire_datetime'] = (
+ auth_datetime +
+ timedelta(seconds=oidc_profile['refresh_expires_in']))
+ assert user.oidc_profile == oidc_profile_cp
+
+ backend_path = 'swh.web.auth.backends.OIDCAuthorizationCodePKCEBackend'
+ assert user.backend == backend_path
+ backend_idx = settings.AUTHENTICATION_BACKENDS.index(backend_path)
+ assert get_backends()[backend_idx].get_user(user.id) == user
+
+
+@pytest.mark.django_db
+def test_oidc_code_pkce_auth_backend_failure(mocker, request_factory):
+ mock_keycloak(mocker, auth_success=False)
+
+ user = _authenticate_user(request_factory)
+
+ assert user is None
diff --git a/swh/web/tests/auth/test_utils.py b/swh/web/tests/auth/test_utils.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_utils.py
@@ -0,0 +1,38 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import hashlib
+import re
+
+from base64 import urlsafe_b64encode
+
+from swh.web.auth.utils import gen_oidc_pkce_codes
+
+
+def test_gen_oidc_pkce_codes():
+ """
+ Check generated PKCE codes respect the specification
+ (see https://tools.ietf.org/html/rfc7636#section-4.1)
+ """
+ code_verifier, code_challenge = gen_oidc_pkce_codes()
+
+ # check the code verifier only contains allowed characters
+ sub = re.sub(r'[a-zA-Z0-9-\._~]+', '', code_verifier)
+ assert len(sub) == 0
+
+ # check minimum and maximum authorized length for the
+ # code verifier
+ assert len(code_verifier) >= 43
+ assert len(code_verifier) <= 128
+
+ # compute code challenge from code verifier
+ challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
+ challenge = urlsafe_b64encode(challenge).decode('utf-8')
+ challenge = challenge.replace('=', '')
+
+ # check base64 padding is not present
+ assert not code_challenge[-1].endswith('=')
+ # check code challenge is valid
+ assert code_challenge == challenge
diff --git a/swh/web/tests/auth/test_views.py b/swh/web/tests/auth/test_views.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/auth/test_views.py
@@ -0,0 +1,246 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import uuid
+
+from django.conf import settings
+from django.http import QueryDict
+from django.contrib.auth.models import AnonymousUser, User
+
+import pytest
+
+from swh.web.auth.models import OIDCUser
+from swh.web.common.utils import reverse
+from swh.web.tests.django_asserts import assert_template_used, assert_contains
+
+from .keycloak_mock import mock_keycloak
+from .sample_data import oidc_profile
+
+
+@pytest.mark.django_db
+def test_oidc_login_views_success(client, mocker):
+ """
+ Simulate a successful login authentication with OpenID Connect
+ authorization code flow with PKCE.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+
+ # user initiates login process
+ login_url = reverse('oidc-login')
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should redirect to Keycloak authentication page in order
+ # for a user to login with its username / password
+ assert response.status_code == 302
+ assert isinstance(request.user, AnonymousUser)
+
+ redirect_url = response['location']
+ redirect_url_split = redirect_url.split('?')
+ authorization_url = kc_oidc_mock.well_known()['authorization_endpoint']
+ query_dict = QueryDict(redirect_url_split[1])
+
+ # check redirect url is valid
+ assert redirect_url_split[0] == authorization_url
+ assert 'client_id' in query_dict
+ assert query_dict['client_id'] == settings.OIDC_SWH_WEB_CLIENT_ID
+ assert 'response_type' in query_dict
+ assert query_dict['response_type'] == 'code'
+ assert 'redirect_uri' in query_dict
+ assert query_dict['redirect_uri'] == reverse('oidc-login-complete',
+ request=request)
+ assert 'code_challenge_method' in query_dict
+ assert query_dict['code_challenge_method'] == 'S256'
+ assert 'scope' in query_dict
+ assert query_dict['scope'] == 'openid'
+ assert 'state' in query_dict
+ assert 'code_challenge' in query_dict
+
+ # check a nonce has been registered in user session
+ assert 'nonce' in request.session
+ nonce = request.session['nonce']
+ assert 'code_verifier' in nonce
+ assert 'state' in nonce
+ assert 'redirect_uri' in nonce
+ assert nonce['redirect_uri'] == query_dict['redirect_uri']
+
+ # once a user has identified himself in Keycloak, he is
+ # redirected to the 'oidc-login-complete' view to
+ # login in Django.
+
+ # generate authorization code / session state in the same
+ # manner as Keycloak
+ code = f'{str(uuid.uuid4())}.{str(uuid.uuid4())}.{str(uuid.uuid4())}'
+ session_state = str(uuid.uuid4())
+
+ login_complete_url = reverse('oidc-login-complete',
+ query_params={'code': code,
+ 'state': nonce['state'],
+ 'session_state': session_state})
+
+ # login process finalization
+ response = client.get(login_complete_url)
+ request = response.wsgi_request
+
+ # should redirect to root url by default
+ assert response.status_code == 302
+ assert response['location'] == request.build_absolute_uri('/')
+
+ # user should be authenticated
+ assert isinstance(request.user, OIDCUser)
+
+ # check remoter used has not been saved to Django database
+ with pytest.raises(User.DoesNotExist):
+ User.objects.get(username=request.user.username)
+
+
+@pytest.mark.django_db
+def test_oidc_logout_view_success(client, mocker):
+ """
+ Simulate a successful logout operation with OpenID Connect.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+ # login our test user
+ client.login(code='', code_verifier='', redirect_uri='')
+ kc_oidc_mock.authorization_code.assert_called()
+
+ # user initiates logout
+ oidc_logout_url = reverse('oidc-logout')
+ response = client.get(oidc_logout_url)
+ request = response.wsgi_request
+
+ # should redirect to logout page
+ assert response.status_code == 302
+ logout_url = reverse('logout', query_params={'remote_user': 1})
+ assert response['location'] == request.build_absolute_uri(logout_url)
+
+ # should have been logged out in Keycloak
+ kc_oidc_mock.logout.assert_called_with(oidc_profile['refresh_token'])
+
+ # check effective logout in Django
+ assert(isinstance(request.user, AnonymousUser))
+
+
+@pytest.mark.django_db
+def test_oidc_login_view_failure(client, mocker):
+ """
+ Simulate a failed authentication with OpenID Connect.
+ """
+ # mock Keycloak client
+ mock_keycloak(mocker, auth_success=False)
+
+ # user initiates login process
+ login_url = reverse('oidc-login')
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert response.status_code == 500
+ assert_template_used(response, "error.html")
+
+ # no users should be logged int
+ assert isinstance(request.user, AnonymousUser)
+
+
+@pytest.mark.django_db
+def test_oidc_login_complete_view_failures(client, mocker):
+ """
+ Simulate possible errors with OpenID Connect in the login complete view.
+ """
+ # mock Keycloak client
+ mock_keycloak(mocker, auth_success=False)
+
+ # user initiates login process
+ login_url = reverse('oidc-login-complete')
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, 'Login process has not been initialized.',
+ status_code=500)
+
+ # no user should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+ # simulate login process has been initialized
+ session = client.session
+ session['nonce'] = {
+ 'code_verifier': '',
+ 'state': str(uuid.uuid4()),
+ 'redirect_uri': '',
+ 'next_path': None,
+ }
+ session.save()
+
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, 'Missing query parameters for authentication.',
+ status_code=400)
+
+ # no user should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+ # check CSRF attack detection
+ login_url = reverse('oidc-login-complete',
+ query_params={'code': 'some-code',
+ 'state': 'some-state'})
+
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, 'CSRF attack detected, aborting login process.',
+ status_code=400)
+
+ # no user should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+ # check authentication errors are reported
+ login_url = reverse('oidc-login-complete',
+ query_params={'code': 'some-code',
+ 'state': session['nonce']['state']})
+
+ response = client.get(login_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert response.status_code == 500
+ assert_template_used(response, "error.html")
+
+ # no user should be logged in
+ assert isinstance(request.user, AnonymousUser)
+
+
+@pytest.mark.django_db
+def test_oidc_logout_view_failure(client, mocker):
+ """
+ Simulate a failed logout operation with OpenID Connect.
+ """
+ # mock Keycloak client
+ kc_oidc_mock = mock_keycloak(mocker)
+ # login our test user
+ client.login(code='', code_verifier='', redirect_uri='')
+
+ err_msg = 'Authentication server error'
+ kc_oidc_mock.logout.side_effect = Exception(err_msg)
+
+ # user initiates logout process
+ logout_url = reverse('oidc-logout')
+ response = client.get(logout_url)
+ request = response.wsgi_request
+
+ # should render an error page
+ assert_template_used(response, "error.html")
+ assert_contains(response, err_msg, status_code=500)
+
+ # user should be logged out from Django anyway
+ assert isinstance(request.user, AnonymousUser)
diff --git a/swh/web/urls.py b/swh/web/urls.py
--- a/swh/web/urls.py
+++ b/swh/web/urls.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2017-2019 The Software Heritage developers
+# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -8,7 +8,9 @@
from django.conf.urls import (
url, include, handler400, handler403, handler404, handler500
)
+from django.contrib.auth.views import LogoutView
from django.contrib.staticfiles.views import serve
+
from django.shortcuts import render
from django.views.generic.base import RedirectView
@@ -40,6 +42,10 @@
url(r'^(?Pswh:[0-9]+:[a-z]+:[0-9a-f]+.*)/$',
swh_id_browse, name='browse-swh-id'),
url(r'^', include('swh.web.misc.urls')),
+ url(r'^', include('swh.web.auth.views')),
+ url(r'^logout/$',
+ LogoutView.as_view(template_name='logout.html'),
+ name='logout'),
]